package com.ujsh.rxjava.sample;

import android.content.Context;
import android.support.test.InstrumentationRegistry;
import android.support.test.runner.AndroidJUnit4;
import android.util.Log;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.reactivestreams.Publisher;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Scheduler;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.schedulers.Schedulers;

import static org.junit.Assert.*;

/**
 * Instrumentation test, which will execute on an Android device.
 *
 * @see <a href="http://d.android.com/tools/testing">Testing documentation</a>
 */
@RunWith(AndroidJUnit4.class)
public class ExampleInstrumentedTest {
    @Test
    public void useAppContext() throws Exception {
        // Context of the app under test.
        Context appContext = InstrumentationRegistry.getTargetContext();
        assertEquals("com.ujsh.rxjava.sample", appContext.getPackageName());
    }

    /**
     * 创建一个简单的RxJava
     */
    @Test
    public void createRxJava() {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                System.out.println("rxjava--获取执行线程信息：" + Thread.currentThread().getId() + "  name:" + Thread.currentThread().getName());
                String strs[] = new String[]{"1", "2", "3", "4"};

                boolean isNext = true;
                int position = 0;
                while (true) {
                    if (strs.length == position) {
                        e.onComplete();
                        isNext = false;
                        return;
                    } else {
                        e.onNext(strs[position]);


                    }
                    position++;
                    //throw  new RuntimeException("出现异常");
                }

            }
        }).subscribeOn(Schedulers.io())//设置订阅者执行的线程
                .observeOn(AndroidSchedulers.mainThread())//设置观察者执行的线程
                .subscribe(new Consumer<String>() {//开始订阅，只有订阅之后才会执行订阅方法
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println("rxjava--values：" + s + "\n");
                        System.out.println("rxjava--获取回调线程信息：" + Thread.currentThread().getId() + "  name:" + Thread.currentThread().getName());
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        System.err.println("rxjava--错误rxjava--获取回调线程信息：" + Thread.currentThread().getId() + "  name:" + Thread.currentThread().getName());
                        System.err.println("rxjava--rxjava-->" + throwable.getLocalizedMessage());
                    }
                }, new Action() {
                    @Override
                    public void run() throws Exception {
                        System.out.println("rxjava--完成rxjava--获取回调线程信息：" + Thread.currentThread().getId() + "  name:" + Thread.currentThread().getName());
                    }
                });
    }

    /**
     * 通过一个数组创建RxJava
     */
    @Test
    public void createRxJavaByArray() {
        String[] strs = {"I", "am", "a", "person"};
        Observable.fromArray(strs).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("rxjava--" + s);
            }
        });

    }

    /**
     * 通过Iterable的子类创建RxJava
     */
    @Test
    public void createRxJavaByIterable() {
        List<String> list = new ArrayList<>();
        list.add("I");
        list.add("am");
        list.add("a");
        list.add("person!");
        Observable.fromIterable(list).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("rxjava--" + s);
            }
        });

    }

    /**
     * 此方法返回一个每隔指定的时间间隔就发射一个序号的 Observable 对象，可用来做倒计时心跳包等操作，无限发送，除非调用dispose()可以终止。
     */
    @Test
    public void interval() {
        //开始不延时，之后每隔一秒发送一次序列号从 0 开始，每次累加 1如0 1 2 3 4 5.....
        Observable.interval(0, 1, TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        System.out.println("rxjava-------" + aLong);

                        if (aLong == 10) {

                        }

                    }
                });

    }


    /**
     * 创建一个在指定延迟时间后发射一条数据（ 固定值：0 ）的 Observable 对象，可用来做定时操作。
     */
    @Test
    public void timer() {
        //3秒后会输出0
        Observable.timer(3, TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        System.out.println("rxjava-------" + aLong);
                    }
                });
    }


    /**
     * 此方法可以发射一个指定范围的数
     */
    @Test
    public void range() {

        //会直接输出2 3 4 5 6
        Observable.range(2, 5).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println("rxjava-------" + integer);
            }
        });
    }

    /**
     * map 操作符是可以将返回的数据变换成别的数据类新，比如你可以使用 map 操作符将原来要发射的字符串数据变换成数值型在发射出去。
     */
    @Test
    public void mapRxJava() {
        List<String> list = new ArrayList<>();
        list.add("I");
        list.add("am");
        list.add("a");
        list.add("person");
        //这里使用map将重新生成一个Integer类型的Observabl最终打印出来的是每个字符串的hashCode值
        Observable.fromIterable(list)
                //转换数据类型
                .map(new Function<String, Integer>() {
                    @Override
                    public Integer apply(String s) throws Exception {
                        return s.hashCode();
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println("rxjava------>" + integer);
                    }
                });
    }

    /**
     * 不同于map的是flatMap 返回的是一个全新的Observable 对象。
     * 这里使用网上的一个例子，将原本List<Integer>的被观察者数据类型转换成了Integer的被观察者数据类型。
     * 这里Flowable和Observable本质是一个东西，Flowable是Rx2.0里的东西为了解决一种叫做背压的问题而诞生的。
     */
    @Test
    public void flatMapRxJava() {
        List<Integer> list = new ArrayList<>();
        list.add(10);
        list.add(1);
        list.add(5);
        Flowable.just(list)
                //转换数据结构
                .flatMap(new Function<List<Integer>, Publisher<Integer>>() {
                    @Override
                    public Publisher<Integer> apply(List<Integer> integers) throws Exception {

                        return Flowable.fromIterable(integers);
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println("rxjava------>"+integer);
            }
        });
    }

    /**
     * 这个操作符可以作为数据筛选器，帮你过滤不想要的数据。
     *下面的例子当数据>3才能放过，因此输出4 5
     */

    @Test
    public void filterRxJava() {
        Observable.fromArray(1,2,-3,4,5)
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {

                        return integer>3;
                    }
                })

                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println("rxjava------>"+integer);
                    }
                });
    }
    /**
     * 此操作符用于指定想要的数据数量,下面的例子最终只输出1和2
     */

    @Test
    public void takeRxJava() {
        Observable.fromArray(1,2,-3,4,5)
                .take(4)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println("rxjava------>："+integer);
                    }
                });
    }

    /**
     *此操作符可以在消费者也就是观察者 接收到数据之前做事。
     */

    @Test
    public void doOnNextRxJava() {
        Observable.fromArray(1,2,-3,4,5)
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println("rxjava------>准备工作："+integer);

                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println("rxjava------>"+integer);
                    }
                });
    }
    /**
     * 此操作符可以在消费者也就是观察者 接收到数据之后做事。
     */

    @Test
    public void doAfterNextRxJava() {
        Observable.fromArray(1,2,-3,4,5)
                .doAfterNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println("rxjava------>数据接收完成："+integer);
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println("rxjava------>"+integer);
                    }
                });
    }

    /**
     * 此操作符可以在消费者也就是被观察者出现异常之前，收到处理事件，可以用在保存错误信息。
     */

    @Test
    public void doErrorRxJava() {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                String strs[] = new String[]{"1", "2", "3", "4"};
                boolean isNext = true;
                int position = 0;
                while (true) {
                    if (strs.length == position) {
                        e.onComplete();
                        isNext = false;
                        return;
                    } else {
                        e.onNext(strs[position]);
                    }
                    position++;
                    throw  new RuntimeException("出现异常");
                }
            }
        }).doOnError(new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                System.err.println("rxjava--->即将出现崩溃" + throwable.getLocalizedMessage());
            }
        }).subscribe(new Consumer<String>() {//开始订阅，只有订阅之后才会执行订阅方法
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println("rxjava--values：" + s + "\n");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        System.err.println("rxjava--错误rxjava--获取回调线程信息：" + Thread.currentThread().getId() + "  name:" + Thread.currentThread().getName());
                        System.err.println("rxjava--rxjava-->" + throwable.getLocalizedMessage());
                    }
                });
    }

    /**
     * doOnComplete 此操作符可以在消费者也就是被观察者所有事件处理完成以后发出事件，要比subscribe（订阅者）中的doOnComplete先执行
     */

    @Test
    public void doOnCompleteRxJava() {
        Observable.fromArray(1,2,-3,4,5)
                .doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        System.out.println("rxjava------>数据即将处理完成" );
                    }
                })
                .subscribe(new Consumer<Integer>() {
                               @Override
                               public void accept(Integer integer) throws Exception {
                                   System.out.println("rxjava------>" + integer);
                               }
                           }, new Consumer<Throwable>() {
                               @Override
                               public void accept(Throwable throwable) throws Exception {

                               }
                           }
                        , new Action() {
                            @Override
                            public void run() throws Exception {
                                System.out.println("rxjava------>数据即已经完成" );
                            }
                        });
    }



}
